S3上のファイルの文字列置換を部分的にメモリに読み出しながら実施する方法
データアナリティクス事業本部のueharaです。
今回は、S3上のファイル内の文字列の置換を部分的にメモリに読み出しながら実施する方法を紹介してみたいと思います。
なぜ部分的に読み込みながら処理したいのか?
例えば、Glue (Python Shell)でのデータETL処理の過程でS3上にあるファイル内の文字列の置換を実施したいケースを考えます。
以下の記事でご紹介しているように、Glue (Python Shell)で自由に使える一時ディレクトリの容量は約14GB程度となります。
したがって、例えば対象ファイルが20GBであれば「Glueの一時ディレクトリにS3からファイルをダウンロードし、置換処理を実施した後、処理したファイルをS3にアップロードする」という処理が不可能になります。
ファイルが10GB(<14GB)と過程した場合でも、仮に利用しているGlueのDPUが1/16DPUであればメモリに乗るデータ量はせいぜい1GB以下なので(※Glueがシステム的に利用している分もあるので、実際に利用できる分はもっと少ない)、どの道部分的に置換していく処理は必要になります。
対応方針
結論から言うと、S3のマルチパートアップロード機能を利用して処理を行うことができます。
公式のドキュメントを確認すると、マルチパートアップロードについて以下のように説明がされています。
マルチパートアップロードを使用すると、単一のオブジェクトをパートのセットとしてアップロードすることができます。
要は、part1, part2, ...といったように複数のパートに分けてファイルのアップロードを行い、最後に1つのオブジェクトとしてまとめることができる機能になります。
これを利用すれば、例えばS3上にある20GBのfoo.csv
に対し
foo.csv
の最初の50MBメモリに読み込んで置換処理を行い、part1としてアップロードfoo.csv
の次の50MBを読み込んで置換処理を行い、part2としてアップロード- (20GB分処理するまで繰り返し)
- 最後に1つのファイルにまとめる
といったようにメモリに読み出す量を絞りながら、かつ一時ディレクトリを利用せずに置換処理を行いアップロードを実行することができます。
注意点
ただし、マルチパートアップロードは5MB未満のファイルには対応しておりません。
したがって、置換処理の対象が5MB未満のファイルであった場合、マルチパートアップロードを利用しないような設計にする必要があります。
Pythonのコード
上記で説明した対応方針に沿って処理を行うPythonのコードは以下の通りです。
import os import boto3 s3_client = None def _replace_and_upload( bucket_name, object_key, new_object_key, str_to_find, str_to_replace ): global s3_client # download file print("download file") data = s3_client.get_object(Bucket=bucket_name, Key=object_key) data = data["Body"].read() # replace data data = data.replace(str_to_find.encode(), str_to_replace.encode()) # upload file print("upload file") s3_client.put_object(Body=data, Bucket=bucket_name, Key=new_object_key) return def _replace_and_upload_chunks( upload_id, buffer_size, file_size, bucket_name, object_key, new_object_key, str_to_find, str_to_replace, ): global s3_client part_info = {"Parts": []} part_number = 1 offset = 0 overlap_size = len(str_to_find) carry_over_data = b'' while offset < file_size: print(f"part_number = {part_number}") # calculate the last byte of the current chunk with overlap if part_number == 1: download_end = buffer_size + overlap_size else: download_end = min(offset + buffer_size, file_size) # download data from S3 by specifying byte range range_header = f"bytes={offset}-{download_end - 1}" print(f"download part_number = {part_number}, {range_header}") data = s3_client.get_object( Bucket=bucket_name, Key=object_key, Range=range_header ) chunk = data["Body"].read() if part_number > 1: # add carry_over_data from previous chunk chunk = carry_over_data + chunk carry_over_data = chunk[-overlap_size:] # replace data in chunk chunk = chunk.replace(str_to_find.encode(), str_to_replace.encode()) if part_number == 1: chunk = chunk[:-overlap_size] else: chunk = chunk[overlap_size:] # write processed chunks to new file stream print(f"upload part_number = {part_number}") response = s3_client.upload_part( Body=chunk, Bucket=bucket_name, Key=new_object_key, PartNumber=part_number, UploadId=upload_id, ) # record information of uploaded parts part_info["Parts"].append({"PartNumber": part_number, "ETag": response["ETag"]}) offset += buffer_size part_number += 1 del chunk return part_info def replace_character_in_s3(s3_path, str_to_find, str_to_replace): global s3_client s3_client = boto3.client("s3") bucket_name, object_key = s3_path.replace("s3://", "").split("/", 1) splited_path = os.path.splitext(object_key) new_object_key = splited_path[0] + "_replaced" + splited_path[1] # set the buffer size handled in memory buffer_size = 50 * 1024 * 1024 # 50MB # get file size response = s3_client.head_object(Bucket=bucket_name, Key=object_key) file_size = response["ContentLength"] # single upload if file_size < buffer_size: _replace_and_upload( bucket_name, object_key, new_object_key, str_to_find, str_to_replace ) # multipart upload else: try: multipart_upload = s3_client.create_multipart_upload( Bucket=bucket_name, Key=new_object_key ) upload_id = multipart_upload["UploadId"] part_info = _replace_and_upload_chunks( upload_id, buffer_size, file_size, bucket_name, object_key, new_object_key, str_to_find, str_to_replace, ) # notify completion when all parts are uploaded s3_client.complete_multipart_upload( Bucket=bucket_name, Key=new_object_key, UploadId=upload_id, MultipartUpload=part_info, ) except Exception as e: # cancel multipart upload if error occurs s3_client.abort_multipart_upload( Bucket=bucket_name, Key=new_object_key, UploadId=upload_id ) raise e return bucket_name, new_object_key if __name__ == "__main__": replace_character_in_s3( "s3://cm-da-uehara/foo/test.csv", str_to_find=r"\"", str_to_replace=r'""', )
上記コードについて、メモリに読み出すbuffer_size
は50MBに設定しています。
ファイルが50MB未満であればマルチパートアップロードを行わず、一回で処理を行うようにしています。(これで、5MB未満のファイルであっても問題ありません)
50MB以上であれば、50MBずつファイルを読み込んで置換処理を行い、マルチパートアップロードにより部分的にアップロードを行うようにしています。
また、boto3のS3 Clientのget_object()
関数は引数にRange
を取ることができ、GetObject操作でのバイト範囲を指定することができますので、部分的な読み出しについてはそちらを利用しています。
なお、chunk間でstr_to_find
の文字列が跨がっている際に、それを見つけて置き換えるためoverlap_size
とcarry_over_data
を利用しています。具体的には、単純に50MB読み込むだけではstr_to_find
がチャンクの境目に位置している場合は正しく置き換えができないので、chunkを少し重ねて読み込み、次のchunkと組み合わせて処理できるようにしています。
ただし、上記は置換前と置換後の文字列の長さが同じことを前提としていることにご注意下さい。置換前後で文字列の長さが異なる場合は、別途それを考慮した処理にする必要があります。
肝心の置換の内容は\"
という文字列を""
にするという処理にしています。("
に対するエスケープ文字を\
ではなく"
にする置換処理)
その他の細かい処理についてはコメントに記載の通りです。
動作テスト
S3バケットに、軽めのcsvファイルであるtest_small.csv
と、比較的重めのファイルであるtest_medium.csv
ファイルを用意します。
まず、test_small.csv
を対象としてプログラムを実行します。
$ python test.py download file upload file
S3を確認すると、正しく置換が実行されたtest_small_replaced.csv
が作成されていることが確認できました。
次に、test_medium.csv
を対象としてプログラムを実行します。
$ python test.py part_number = 1 download part_number = 1, bytes=0-52428801 upload part_number = 1 part_number = 2 download part_number = 2, bytes=52428800-104857599 upload part_number = 2 part_number = 3 download part_number = 3, bytes=104857600-157286399 upload part_number = 3 part_number = 4 download part_number = 4, bytes=157286400-209715199 upload part_number = 4 part_number = 5 download part_number = 5, bytes=209715200-229528106 upload part_number = 5
部分的に処理が実行され、test_medium_replaced.csv
が作成されていることを確認できました。
ファイルの内容についても、正しく置換処理が実施されていることを確認しました。
最後に
今回は、S3上のファイル内の文字列の置換を部分的にメモリに読み出しながら実施する方法をご紹介しました。
参考になりましたら幸いです。